Add TableSink operator with Java/Spark implementations#665
Add TableSink operator with Java/Spark implementations#665harrygav wants to merge 1 commit intoapache:mainfrom
Conversation
|
Thanks @harrygav, this is great! Could we make |
|
Thank you - just to make the tests running, how's about mocking the JDBC layer? Wrap |
I will take a look and update the PR to continue the discussion! |
|
Hey @harrygav, any news on this? Apparently a table sink is crucial for many things and we would like to start using it already. |
|
Also, another question: wouldn't it make sense to also have a sink for a database? Now you have implemented one execution operator for Java and one for Spark but why not for a database? |
|
Hi @zkaoudi, nice to hear that the PR will be useful, I will follow up on this by the end of the week! Thanks for your input, I think there are many things to be clarified for the sink operator, but I guess we will figure them out once we know more about the targeted use cases we want to cover. With the current implementation, you could do the ETL pipeline you mention through the Java or Spark platforms: e.g., Source(Java/Spark from DBMS1)->ETL(Java/Spark)->Sink(Java/Spark to DBMS2). Or where you thinking to write from DBMS1 into DBMS2 directly without any intermediate Java/Spark platform step? That would also be interesting for some use cases (improved perf) but becomes cumbersome to maintain in terms of interoperability. Let me know what you think! |
|
Yes I was thinking about directly writing from DBMS1 to DBMS2. For example, if you have two tables in two DBMSs and you want to join them and write the result into DBMS2 without doing the join in Spark or java. What do you mean with issues of interoperability? |
|
But on a second thought, what I described above as a scenario is more like a conversion operator in addition to a sink. You would ideally want to create a temp table to do the join and then persist the result. |
|
Hi all, picking up this one again. I just pushed a commit with the update:
I think it would be wise to add a couple of DBMSes for the tests, which would also be useful for the source operators or supporting JDBC platforms themselves. This could be done either through their embedded versions or through maven testcontainers. Then, we could add support for sinks on other platforms, e.g., the JDBC platform, to also support DBMS->DBMS workloads. Let me know what you think about the PR, and if we want to do some of the next steps (e.g., testing) here or in another PR. |
.asf.yaml
Outdated
| description: Apache Wayang is the first cross-platform data processing system. | ||
| homepage: https://wayang.apache.org/ | ||
| description: Apache Wayang(incubating) is the first cross-platform data processing system. | ||
| homepage: https://wayang.incubator.apache.org/ |
There was a problem hiding this comment.
Do not modify this file, it seems it is an old version.
There was a problem hiding this comment.
Sorry, this change file mistakenly sneaked in after the rebase.
|
Thanks a lot for your contribution Harry! |
|
I can squash the commits and update the PR. Anything else to address? |
|
All seems good to me. We can merge. Thank you @harrygav |
|
@harrygav Just remember to clean up the rheem references and I'm also happy :) |
7cd4822 to
ccac0b2
Compare
|
Thanks @mspruc, replaced the old rheem references to wayang and squashed commits. Let me know if you would like any other changes! |
|
@harrygav On my end you still have references to rheem in Java and Spark table sink |
…plementation and tests
ccac0b2 to
b2a59c4
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a new TableSink unary sink operator (in wayang-basic) and adds platform-specific implementations for Java and Spark to write Record/POJO data into JDBC tables, including basic schema inference via the new SqlTypeUtils.
Changes:
- Added
TableSinkoperator plusSqlTypeUtilsfor JDBC dialect/type/schema inference. - Added
JavaTableSinkimplementation and accompanying H2-based unit tests. - Added
SparkTableSinkimplementation and accompanying H2-based unit tests; updated platform/module POM dependencies.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
| wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkTableSinkTest.java | Adds Spark sink tests against H2 (but currently contains Checkstyle-breaking unused imports). |
| wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkTableSink.java | Implements Spark JDBC writing with Record schema derivation and POJO handling. |
| wayang-platforms/wayang-spark/pom.xml | Adds JDBC driver dependencies for Spark module (currently includes PostgreSQL in compile scope). |
| wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTableSinkTest.java | Adds Java sink tests against H2. |
| wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTableSink.java | Implements JDBC table creation and batch insertion for Java platform. |
| wayang-platforms/wayang-java/pom.xml | Adds test dependencies (H2 + PostgreSQL). |
| wayang-commons/wayang-basic/src/test/java/org/apache/wayang/basic/util/SqlTypeUtilsTest.java | Adds tests for dialect detection and schema/type mapping. |
| wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/util/SqlTypeUtils.java | Adds utilities for JDBC URL product detection + Java→SQL type/schema mapping. |
| wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/TableSink.java | Adds the new logical operator holding table name, mode, props, and optional column names. |
| wayang-commons/wayang-basic/pom.xml | Adds Calcite dependency to support SqlDialect.DatabaseProduct. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| import org.apache.wayang.core.optimizer.OptimizationContext; | ||
| import org.apache.wayang.core.plan.wayangplan.OutputSlot; | ||
| import org.apache.wayang.core.platform.ChannelInstance; | ||
| import org.apache.wayang.core.types.DataSetType; | ||
| import org.apache.wayang.spark.channels.RddChannel; | ||
| import org.apache.wayang.spark.platform.SparkPlatform; |
There was a problem hiding this comment.
This test has unused imports (OptimizationContext, OutputSlot, SparkPlatform), which will fail the repository Checkstyle run (google_checks.xml includes UnusedImports). Please remove the unused imports (and any now-unneeded mocking setup) to keep the build green.
| import org.apache.wayang.core.optimizer.OptimizationContext; | |
| import org.apache.wayang.core.plan.wayangplan.OutputSlot; | |
| import org.apache.wayang.core.platform.ChannelInstance; | |
| import org.apache.wayang.core.types.DataSetType; | |
| import org.apache.wayang.spark.channels.RddChannel; | |
| import org.apache.wayang.spark.platform.SparkPlatform; | |
| import org.apache.wayang.core.platform.ChannelInstance; | |
| import org.apache.wayang.core.types.DataSetType; | |
| import org.apache.wayang.spark.channels.RddChannel; |
| <groupId>org.apache.calcite</groupId> | ||
| <artifactId>calcite-core</artifactId> | ||
| <version>${calcite.version}</version> | ||
| </dependency> | ||
| <dependency> |
There was a problem hiding this comment.
Adding calcite-core to wayang-basic just to access SqlDialect.DatabaseProduct significantly increases the dependency footprint of a low-level module and can introduce version-conflict pressure across the build. Consider replacing this with a small internal enum (or moving dialect-related utilities into a module that already depends on Calcite) to keep wayang-basic lightweight.
| <groupId>org.apache.calcite</groupId> | |
| <artifactId>calcite-core</artifactId> | |
| <version>${calcite.version}</version> | |
| </dependency> | |
| <dependency> |
| if (recordRDD.isEmpty()) { | ||
| return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); | ||
| } | ||
| Record first = (Record) recordRDD.first(); |
There was a problem hiding this comment.
For Record inputs, this does two Spark actions (isEmpty() and then first()), which triggers two jobs and can be expensive. Prefer a single take(1) (or similar) to both detect emptiness and obtain a sample record for schema inference.
| if (recordRDD.isEmpty()) { | |
| return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); | |
| } | |
| Record first = (Record) recordRDD.first(); | |
| List<T> sample = recordRDD.take(1); | |
| if (sample.isEmpty()) { | |
| return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); | |
| } | |
| Record first = (Record) sample.get(0); |
| <dependency> | ||
| <groupId>org.postgresql</groupId> | ||
| <artifactId>postgresql</artifactId> | ||
| <version>42.7.2</version> | ||
| <scope>test</scope> | ||
| </dependency> |
There was a problem hiding this comment.
org.postgresql:postgresql is added as a test dependency in the Java platform module, but the new tests use H2 and there is no usage of the PostgreSQL driver in this module’s test sources. If it’s not needed for other tests, removing it will reduce build time/dependency surface; otherwise, consider adding a test that actually covers PostgreSQL-specific behavior.
| <dependency> | |
| <groupId>org.postgresql</groupId> | |
| <artifactId>postgresql</artifactId> | |
| <version>42.7.2</version> | |
| <scope>test</scope> | |
| </dependency> |
| public static List<SchemaField> getSchema(Class<?> cls, SqlDialect.DatabaseProduct product) { | ||
| List<SchemaField> schema = new ArrayList<>(); | ||
| if (cls == Record.class) { | ||
| // For Record.class without an instance, we can't derive names/types easily | ||
| // Users should use the instance-based getSchema or provide columnNames | ||
| return schema; | ||
| } | ||
|
|
||
| for (Field field : cls.getDeclaredFields()) { | ||
| if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) { | ||
| continue; | ||
| } | ||
| schema.add(new SchemaField(field.getName(), field.getType(), getSqlType(field.getType(), product))); | ||
| } | ||
| return schema; |
There was a problem hiding this comment.
POJO schema derivation uses getDeclaredFields(), but value extraction uses ReflectionUtils.getProperty(...), which requires matching getX() getters. This will fail for private fields without getters (even though they are included in the schema) and can also include fields that should not be persisted. Consider deriving schema from JavaBeans properties/getters (or at least filtering to fields that have corresponding getters).
| recordIterator.forEachRemaining( | ||
| r -> { | ||
| try { | ||
| this.pushToStatement(ps, r, typeClass, finalColumnNames); | ||
| ps.addBatch(); | ||
| } catch (SQLException e) { | ||
| e.printStackTrace(); | ||
| } | ||
| }); |
There was a problem hiding this comment.
recordIterator.forEachRemaining swallows SQLException via printStackTrace() and keeps building the batch, which can lead to partial/incorrect writes without failing the operator. Propagate the failure (e.g., wrap in a runtime exception), then abort/rollback and surface it as a WayangException so the job fails deterministically.
| if (typeClass == Record.class) { | ||
| Record r = (Record) element; | ||
| for (int i = 0; i < columnNames.length; i++) { | ||
| setRecordValue(ps, i + 1, r.getField(i)); | ||
| } | ||
| } else { |
There was a problem hiding this comment.
For Record inputs, the code assumes columnNames.length <= record.size() and indexes fields by position. If columnNames is longer than the Record, r.getField(i) will throw at runtime. Please validate the lengths early (and give a clear error) or derive columnNames from the record size when not provided.
| for (Field field : cls.getDeclaredFields()) { | ||
| if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) { | ||
| continue; | ||
| } | ||
| schema.add(new SchemaField(field.getName(), field.getType(), getSqlType(field.getType(), product))); | ||
| } | ||
| return schema; |
There was a problem hiding this comment.
Class#getDeclaredFields() does not guarantee a stable order across JVMs/compilers, but the derived schema order determines column ordering (and tests assume a specific order). To avoid nondeterministic schemas, sort fields deterministically (e.g., by name) or use a stable property-introspection approach.
| // Update column names in the operator if they were generated | ||
| String[] newColNames = schemaFields.stream().map(SqlTypeUtils.SchemaField::getName).toArray(String[]::new); | ||
| this.setColumnNames(newColNames); | ||
|
|
There was a problem hiding this comment.
this.setColumnNames(...) mutates the operator based on runtime data (first record). Combined with TableSink copying columnNames by reference, this can leak schema decisions across copies/reuses. Prefer keeping inferred column names local to the evaluation (or ensure defensive copies in TableSink and avoid mutating shared state).
| // Update column names in the operator if they were generated | |
| String[] newColNames = schemaFields.stream().map(SqlTypeUtils.SchemaField::getName).toArray(String[]::new); | |
| this.setColumnNames(newColNames); |
| // If columnNames are provided, we should probably select/rename them, | ||
| // but usually createDataFrame(rdd, beanClass) maps fields to columns. | ||
| if (this.getColumnNames() != null && this.getColumnNames().length > 0) { | ||
| // Optionally filter or reorder columns to match this.getColumnNames() | ||
| // For now, Spark's native mapping is preferred. |
There was a problem hiding this comment.
In the POJO branch, columnNames passed to TableSink are silently ignored (the code comments mention it but no behavior is applied). This makes the API misleading because callers might expect renaming/reordering or subset selection. Either implement the mapping (select/rename) or validate and reject columnNames for POJO inputs with a clear error.
| // If columnNames are provided, we should probably select/rename them, | |
| // but usually createDataFrame(rdd, beanClass) maps fields to columns. | |
| if (this.getColumnNames() != null && this.getColumnNames().length > 0) { | |
| // Optionally filter or reorder columns to match this.getColumnNames() | |
| // For now, Spark's native mapping is preferred. | |
| // For POJOs, we currently do not support custom columnNames to avoid | |
| // ambiguous or misleading mappings. Fail fast if they are provided. | |
| String[] columnNames = this.getColumnNames(); | |
| if (columnNames != null && columnNames.length > 0) { | |
| throw new WayangException( | |
| "columnNames are not supported for POJO inputs in SparkTableSink. " + | |
| "Either omit columnNames or use Record inputs if you need custom column mapping."); |
Summary
This PR introduces a new
TableSinkoperator for writingRecorddata into a database table via JDBC, with implementations for the Java and Spark platforms.Opening as Draft to start discussion on the operator design and expected behavior.
Changes
New operator:
TableSink(inwayang-basic)UnarySink<Record>that targets a table name and accepts JDBC connectionPropertiesmode(e.g. overwrite) and optional column namesJava platform:
JavaTableSink(inwayang-java)overwriteby dropping the target table firstSpark platform:
SparkTableSink(inwayang-spark)TableSinkoperatorNotes / open questions
VARCHARs)modebehavior (overwrite vs append, etc.) should be agreed on and formalized.How to use / test
To run end-to-end locally, you currently need an external PostgreSQL instance available and provide JDBC connection details (driver/url/user/password) in the test setup/environment.